-
Notifications
You must be signed in to change notification settings - Fork 92
Conversation
pktfwd/manager.go
Outdated
func (m *Manager) startRoutines(bgCtx context.Context, err chan error, runTime time.Time) { | ||
var errC = make(chan error) | ||
var errC = make(chan error, 4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why should the channel not block anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the situation where statusRoutine
, networkRoutine
and gpsRoutine
might all receive an error at the same time, they'll all try to pass it to errC
at the same time, which will block these goroutines (since errC
is of size 1 currently) and prevent them from receiving the <-bgCtx.Done()
information. This means we won't be able to shut these goroutines gracefully. It would probably be better if we let those routines shoot their error, and then - since their cancel function will be called - let them shut gracefully.
You're also making me realize that closing the channel will make the goroutines panic in that situation, so if that's good for you, I'll just remove this close()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it cleaner for the methods to return their own error channel? This results in multiple error channels, but at least the error owner (i.e. the function) controls the capacity, whether or not to block and close the channel. This is also safer against sending after closing, as the function closes it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mutexes like this feel a bit sketchy.
Please also consider one goroutine that receives from a channel for actions (functions) to be performed on the connection. That's more asynchronous, but I'm not sure if that's best in this context
pktfwd/network.go
Outdated
c.routerMutex.Lock() | ||
c.connectToStreams(router.NewRouterClientForGateway(router.NewRouterClient(c.currentRouterConn), c.runConfig.ID, c.token), true) | ||
c.routerMutex.Unlock() | ||
c.uplinkMutex.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unlock in reverse order
pktfwd/network.go
Outdated
c.connectToStreams(router.NewRouterClientForGateway(router.NewRouterClient(c.currentRouterConn), c.runConfig.ID, c.token), true) | ||
c.routerMutex.Unlock() | ||
c.uplinkMutex.Unlock() | ||
c.downlinkStreamChange <- true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feels a bit like a callback to me; isn't that cleaner?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It sort of is yes. The idea behind this was that gRPC streams are implemented as channels in Go. When waiting for a downlink, the select
block is stuck on a specific channel - downlinkStreamChange
signals that the stream needs to be refreshed, and fits well within the select
block. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
pktfwd/manager.go
Outdated
func (m *Manager) startRoutines(bgCtx context.Context, err chan error, runTime time.Time) { | ||
var errC = make(chan error) | ||
var errC = make(chan error, 4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it cleaner for the methods to return their own error channel? This results in multiple error channels, but at least the error owner (i.e. the function) controls the capacity, whether or not to block and close the channel. This is also safer against sending after closing, as the function closes it
pktfwd/network.go
Outdated
c.uplinkMutex.Lock() | ||
c.connectToStreams(router.NewRouterClientForGateway(router.NewRouterClient(routerConn), gatewayID, c.token), true) | ||
c.uplinkMutex.Unlock() | ||
c.routerMutex.Lock() | ||
c.currentRouterConn = routerConn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Afaik all state in a struct in current
; no need to prepend a field with that
|
pktfwd/manager.go
Outdated
err := m.netClient.RefreshRoutine(bgCtx) | ||
if err != nil { | ||
errC <- errors.Wrap(err, "Couldn't refresh account server token") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant make an error channel here, and return chan error
. This function is owner of that channel and controls open, close, sending and capacity
pktfwd/network.go
Outdated
return connectionHealthCheck(c.currentRouterConn) | ||
c.networkMutex.Lock() | ||
t, err := connectionHealthCheck(c.routerConn) | ||
c.networkMutex.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Defer as much as you can, like here
pktfwd/network.go
Outdated
close(c.routerChanges) | ||
c.streamsMutex.Lock() | ||
c.disconnectOfStreams() | ||
c.streamsMutex.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Defer
pktfwd/network.go
Outdated
if err := routerChange(c); err != nil { | ||
c.ctx.WithError(err).Warn("Couldn't operate network client change") | ||
} | ||
c.connectToStreams(router.NewRouterClientForGateway(router.NewRouterClient(c.routerConn), c.runConfig.ID, c.token)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be executed if routerChange(c)
failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check proper closing of channels
Also consider adding one item as capacity to the error channels; there's no reason to block and wait for error handling
pktfwd/manager.go
Outdated
|
||
select { | ||
case <-bgCtx.Done(): | ||
errC <- nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may close it here?
pktfwd/manager.go
Outdated
for { | ||
select { | ||
case <-bgCtx.Done(): | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Close the error channel?
errC <- errors.Wrap(err, "Gateway status transmission error") | ||
return | ||
} | ||
case <-bgCtx.Done(): | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Idem
ecd50d6
to
fce2d62
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One little channel thing + check all occurrences of closing error channels when you can
pktfwd/manager.go
Outdated
go func() { | ||
if err := m.netClient.RefreshRoutine(bgCtx); err != nil { | ||
errC <- errors.Wrap(err, "Couldn't refresh account server token") | ||
close(errC) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the defer close(errC)
approach; if you don't apply it here then the channel stays open
pktfwd/manager.go
Outdated
@@ -95,6 +95,8 @@ func (m *Manager) handler(runStart time.Time) (err error) { | |||
m.ctx.Error("Program ended after one of the network links failed") | |||
} | |||
|
|||
close(c) | |||
close(routinesErr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use defer
. Also, why initialise routinesErr
like that?
Is merge OK? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
The main addition of this PR is to add a refresh of the account server token before its expiry, which is the suspected cause of #28. It also contains a few refactors and corrections in the network components.
With this fix, token expiry is saved in the network client. A routine is started by the global manager, that updates the token (and the clients created with this token) 2 minutes before a token expires. If the token cannot be refreshed, it is considered as a fatal error and shuts the packet forwarder down.